package flinkstudy.stream;

import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 流合并
 *
 * @author daocr
 * @date 2020/2/5
 */
public class JoinStream {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;


    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }


    @Test
    public void connect() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(new Tuple2<Integer, String>(1, "湖南"), new Tuple2<Integer, String>(2, "湖北"), new Tuple2<Integer, String>(3, "上海"));

        DataStreamSource<Tuple2<Integer, String>> source2 = env.addSource(new SourceFunction<Tuple2<Integer, String>>() {
            @Override
            public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
                while (true) {

                    ThreadLocalRandom current = ThreadLocalRandom.current();

                    int i = current.nextInt(1, 4);

                    ctx.collect(Tuple2.of(i, UUID.randomUUID().toString()));

                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        source1.connect(source2).keyBy(0, 0)
                .flatMap(new CoFlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {

                    private Tuple2<Integer, String> tuple2 = null;

                    @Override
                    public void flatMap1(Tuple2<Integer, String> value, Collector<Tuple3<Integer, String, String>> out) throws Exception {
                        System.out.println("flatMap1 value: " + value + "\t thread" + Thread.currentThread().getName());
                        tuple2 = value;
                    }

                    @Override
                    public void flatMap2(Tuple2<Integer, String> value, Collector<Tuple3<Integer, String, String>> out) throws Exception {
                        System.out.println("flatMap2 value: " + value + " 城市：" + Optional.ofNullable(tuple2).map(e -> e.f1).orElse(null) + "\t thread :" + Thread.currentThread().getName());
                    }
                }).print();


        env.execute();

    }


    /**
     * 广播 数据与处理线程数量一致
     *
     * @throws Exception
     */
    @Test
    public void connect2() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        final MapStateDescriptor<Integer, String> CONFIG_DESCRIPTOR = new MapStateDescriptor<>(
                "cityConfig",
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);


        DataStreamSource<Tuple2<Integer, String>> source1 = env.addSource(new SourceFunction<Tuple2<Integer, String>>() {
            @Override
            public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
                while (true) {

                    ThreadLocalRandom current = ThreadLocalRandom.current();

                    int i = current.nextInt(1, 5);

                    ctx.collect(Tuple2.of(i, UUID.randomUUID().toString()));

                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        BroadcastStream<Tuple2<Integer, String>> source2 = env.addSource(new SourceFunction<Tuple2<Integer, String>>() {
            @Override
            public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
                ctx.collect(new Tuple2<Integer, String>(1, "湖南"));
                ctx.collect(new Tuple2<Integer, String>(2, "湖北"));
                ctx.collect(new Tuple2<Integer, String>(3, "上海"));

                Thread.sleep(1000 * 10);

                ctx.collect(new Tuple2<Integer, String>(4, "北京"));

            }

            @Override
            public void cancel() {

            }
        }).broadcast(CONFIG_DESCRIPTOR);


        source1.connect(source2).process(new BroadcastProcessFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {

            @Override
            public void processElement(Tuple2<Integer, String> value, ReadOnlyContext ctx, Collector<Tuple3<Integer, String, String>> out) throws Exception {

                System.out.println("processElement " + value + " \tcurrentWatermark : " + ctx.currentWatermark());
            }

            @Override
            public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<Integer, String, String>> out) throws Exception {


                BroadcastState<Integer, String> state = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
                System.out.println("processBroadcastElement :" + value + "\t state: " + state);
            }
        });


        env.execute();

    }


    @Test
    public void connect3() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(new Tuple2<Integer, String>(1, "湖南"), new Tuple2<Integer, String>(2, "湖北"), new Tuple2<Integer, String>(3, "上海"));

        DataStreamSource<Tuple2<Integer, String>> source2 = env.addSource(new SourceFunction<Tuple2<Integer, String>>() {
            @Override
            public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
                while (true) {

                    ThreadLocalRandom current = ThreadLocalRandom.current();

                    int i = current.nextInt(1, 4);

                    ctx.collect(Tuple2.of(i, UUID.randomUUID().toString()));

                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        source2.join(source1).where(new KeySelector<Tuple2<Integer, String>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                return value.f0;
            }
        }).equalTo(new KeySelector<Tuple2<Integer, String>, Integer>() {
            @Override
            public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                return value.f0;

            }
        }).window(new WindowAssigner<CoGroupedStreams.TaggedUnion<Tuple2<Integer, String>, Tuple2<Integer, String>>, Window>() {
            @Override
            public Collection<Window> assignWindows(CoGroupedStreams.TaggedUnion<Tuple2<Integer, String>, Tuple2<Integer, String>> element, long timestamp, WindowAssignerContext context) {
                return null;
            }

            @Override
            public Trigger<CoGroupedStreams.TaggedUnion<Tuple2<Integer, String>, Tuple2<Integer, String>>, Window> getDefaultTrigger(StreamExecutionEnvironment env) {
                return  null;
            }

            @Override
            public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
                return null;
            }

            @Override
            public boolean isEventTime() {
                return false;
            }
        })
                .apply(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {

                        System.out.println("first :" + first + "\t second：" + second);

                        return null;
                    }
                })
                .print();


        env.execute();

    }
}
